package ins.framework.kafkastream.processor;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;

public class ProcessorA implements Processor<String, String> {
    private int i=0;
    private ProcessorContext context;
    @Override
    public void init(ProcessorContext processorContext) {
        this.context = processorContext;
        this.context.schedule(1000);
    }
    @Override
    public void process(String key, String value) {
        i++;
        String line = key + "MyProcessor A  ----   "+value;
        System.out.println(i+":"+line);
        // 将处理完成的数据转发到downstream processor，比如当前是processor1处理器，通过forward流向到processor2处理器
        context.forward(key, value);
    }
    @Override
    public void punctuate(long timestamp) {

    }

    @Override
    public void close() {
        System.out.println("close a");
    }

}
